package com.gitee.mqttclient.client;

import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.exception.TopicSubscribeException;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;

/**
 * <p>MQTT客户端</p>
 * 使用方式：
 * <pre>
 * PahoMqttClient.create()
 *              .broker(broker)
 *              .auth(username, password)
 *              .clientId(clientId)
 *              .cleanSession(true)
 *              .automaticReconnect(true)
 *              .subscribe(topic, 1)
 *              .callback(new LinkMqttCallback())
 *              .connect();
 * </pre>
 */
public class PahoMqttClient {

    /**
     * 缓存topic
     */
    private final Set<TopicInfo> topicInfos = new LinkedHashSet<>(4);

    private MqttClient client;
    private MqttConnectOptions options;

    private MqttClientPersistence persistence;

    private String broker;
    private String clientId;
    private String username;
    private String password;
    private boolean cleanSession = MqttConnectOptions.CLEAN_SESSION_DEFAULT;
    private boolean automaticReconnect = true;
    private PahoMqttCallback callback;
    private int defaultQos = 2;

    private PahoMqttClient() {
    }

    /**
     * 创建实例
     *
     * @return 返回PahoMqttClient对象
     */
    public static PahoMqttClient create() {
        return new PahoMqttClient();
    }

    /**
     * 设置用户名密码
     *
     * @param username 用户名
     * @param password 密码
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient auth(String username, String password) {
        this.username = username;
        this.password = password;
        return this;
    }

    /**
     * 设置broker地址
     *
     * @param broker broker地址
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient broker(String broker) {
        this.broker = broker;
        return this;
    }

    /**
     * 设置clientId
     *
     * @param clientId clientId
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient clientId(String clientId) {
        this.clientId = clientId;
        return this;
    }

    /**
     * 设置cleanSession
     *
     * @param cleanSession 如果false，保留session。默认true
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient cleanSession(boolean cleanSession) {
        this.cleanSession = cleanSession;
        return this;
    }

    /**
     * 设置是否自动重连
     *
     * @param automaticReconnect true，自动重连。默认false
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient automaticReconnect(boolean automaticReconnect) {
        this.automaticReconnect = automaticReconnect;
        return this;
    }

    /**
     * 设置默认的qos
     *
     * @param qos qos，0,1,2
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient defaultQos(int qos) {
        MqttMessage.validateQos(qos);
        this.defaultQos = qos;
        return this;
    }

    /**
     * 订阅主题，使用默认的qos
     *
     * @param topic 主题
     * @return 返回PahoMqttClient对象
     * @see #defaultQos 设置全局qos等级
     */
    public PahoMqttClient subscribe(String topic) {
        this.subscribe(topic, defaultQos);
        return this;
    }

    /**
     * 订阅主题，需要在连接前调用，如果需要在连接后订阅主题使用 {@link #subscribeTopic(java.lang.String, int)}
     *
     * @param topic 主题
     * @param qos   qos
     * @return 返回PahoMqttClient对象
     * @see #subscribeTopic
     */
    public PahoMqttClient subscribe(String topic, int qos) {
        MqttMessage.validateQos(qos);
        this.topicInfos.add(new TopicInfo(topic, qos));
        return this;
    }

    /**
     * 订阅主题，
     *
     * @param topic 主题
     * @param qos   qos
     * @throws MqttException
     */
    public void subscribeTopic(String topic, int qos) throws MqttException {
        MqttMessage.validateQos(qos);
        this.client.subscribe(topic, qos);
        this.topicInfos.add(new TopicInfo(topic, qos));
    }

    /**
     * 设置消息回调
     *
     * @param callback 回调
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient callback(PahoMqttCallback callback) {
        this.callback = callback;
        return this;
    }

    /**
     * 设置持久化
     *
     * @param persistence 持久化对象
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient persistence(MqttClientPersistence persistence) {
        this.persistence = persistence;
        return this;
    }

    /**
     * 设置连接配置
     *
     * @param mqttConnectOptions 连接配置
     * @return 返回PahoMqttClient对象
     */
    public PahoMqttClient mqttConnectOptions(MqttConnectOptions mqttConnectOptions) {
        this.options = mqttConnectOptions;
        return this;
    }

    /**
     * 连接到broker。此方法也可用于重连
     * <p>
     * 执行此方法才会真正创建MqttClient对象并连接
     * </p>
     *
     * @throws MqttException 连接错误抛出异常
     */
    public PahoMqttClient connect() throws MqttException {
        this.check();
        if (this.client == null) {
            if (persistence == null) {
                persistence = new MemoryPersistence();
            }
            this.client = new MqttClient(broker, clientId, persistence);
            // 设置回调
            this.callback.setPahoMqttClient(this);
            this.client.setCallback(callback);
        }
        // 建立连接
        this.doConnect();
        return this;
    }

    /**
     * 发布消息，使用默认qos
     *
     * @param topic   主题
     * @param payload 消息内容
     * @throws MqttException
     * @see #defaultQos
     */
    public void publish(String topic, byte[] payload) throws MqttException {
        this.publish(topic, payload, defaultQos);
    }

    /**
     * 发布消息
     *
     * @param topic   主题
     * @param payload 内容
     * @param qos     qos
     * @throws MqttException
     */
    public void publish(String topic, byte[] payload, int qos) throws MqttException {
        this.publish(topic, payload, qos, false);
    }

    /**
     * 发布消息
     *
     * @param topic    主题
     * @param payload  消息体
     * @param qos      qos等级，0、1、2
     * @param retained 是否retain消息
     * @throws MqttException 用于发布消息时遇到的其他错误。例如客户端未连接。
     */
    public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException {
        this.client.publish(topic, payload, qos, retained);
    }

    /**
     * 发布消息
     *
     * @param topic       主题
     * @param mqttMessage 消息内容
     * @throws MqttException 用于发布消息时遇到的其他错误。例如客户端未连接。
     */
    public void publish(String topic, MqttMessage mqttMessage) throws MqttException {
        this.client.publish(topic, mqttMessage);
    }

    protected void check() {
        Objects.requireNonNull(broker, "broker can not null");
        Objects.requireNonNull(clientId, "clientId can not null");
        Objects.requireNonNull(callback, "callback can not null");
    }

    protected void doConnect() throws MqttException {
        if (!client.isConnected()) {
            if (options == null) {
                options = this.createOptions();
            }
            this.client.connect(options);
        }
    }

    protected MqttConnectOptions createOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        if (username != null && password != null) {
            options.setUserName(username);
            options.setPassword(password.toCharArray());
        }
        options.setCleanSession(cleanSession);
        options.setAutomaticReconnect(automaticReconnect);
        return options;
    }

    /**
     * 是否已连接
     *
     * @return true：已连接
     */
    public boolean isConnected() {
        return client.isConnected();
    }

    /**
     * 订阅主题
     * onSuccess 订阅成功回调
     * onFail 订阅失败回调
     *
     * @throws MqttException 订阅失败
     */
    public void subscribeTopics(Consumer<TopicInfo> onSuccess, Consumer<TopicSubscribeException> onFail) {
        for (TopicInfo topicInfo : this.topicInfos) {
            try {
                this.client.subscribe(topicInfo.getTopic(), topicInfo.getQos());
                if (onSuccess != null) {
                    onSuccess.accept(topicInfo);
                }
            } catch (MqttException e) {
                if (onFail != null) {
                    onFail.accept(new TopicSubscribeException(e, topicInfo));
                }
            }
        }
    }

    /**
     * 获取订阅的主题
     *
     * @return 返回订阅的主题
     */
    public Set<TopicInfo> getTopicInfos() {
        return topicInfos;
    }

    public MqttClientPersistence getPersistence() {
        return persistence;
    }

    /**
     * 返回paho内置客户端
     *
     * @return 返回paho客户端
     */
    public MqttClient getClient() {
        return client;
    }

    public String getClientId() {
        return clientId;
    }

    public String getBroker() {
        return broker;
    }

    public MqttConnectOptions getOptions() {
        return options;
    }

    public String getUsername() {
        return username;
    }

    public String getPassword() {
        return password;
    }

    public boolean isCleanSession() {
        return cleanSession;
    }

    public boolean isAutomaticReconnect() {
        return automaticReconnect;
    }

    public PahoMqttCallback getCallback() {
        return callback;
    }


}
